Skip to content

08 流式输出

上一篇文章我们学习了事件流stream_events,它是LangChain v1.3之后推荐的新API。而stream方法是更底层的流式输出API,它提供了三种不同的流模式,可以灵活组合使用。

简单来说,stream_events是一个"高级包装",stream是"底层原语"。如果你是新项目,官方推荐用stream_events;但stream的灵活性更强,有些场景下它更合适。

一、三种流模式

stream方法支持三种流模式:

模式用途
updates每个Agent步骤后输出状态更新,比如模型调用了什么工具、返回了什么结果
messages流式输出LLM生成的token,实现"打字机"效果
custom自定义数据,可以在工具内部输出任意进度信息

这三种模式可以单独使用,也可以组合使用。

二、Agent进度输出

使用stream_mode="updates"可以获取Agent每一步的状态变化。比如一个简单的天气查询,会经历三步:

  1. 模型节点:LLM决定调用get_weather工具
  2. 工具节点:执行get_weather工具,返回结果
  3. 模型节点:LLM根据工具结果生成最终回复
python
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode="updates",
    version="v2",
):
    print(f"类型: {chunk['type']}")
    print(f"数据: {chunk['data']}")
    print()

输出效果类似:

类型: updates
数据: {'model': {'messages': [AIMessage(content='', tool_calls=[{'name': 'get_weather', 'args': {'city': '杭州'}, ...}])]}}

类型: updates
数据: {'tools': {'messages': [ToolMessage(content='杭州今天晴天,气温28°C~35°C', ...)]}}

类型: updates
数据: {'model': {'messages': [AIMessage(content='杭州今天天气晴朗,气温28°C到35°C,适合出门。')]}

每个chunk是一个字典,type表示流模式,data是具体的数据内容。

三、LLM Tokens流式输出

使用stream_mode="messages"可以逐token输出模型的回复,实现"打字机"效果:

python
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        # 输出token内容
        print(f"节点: {metadata['langgraph_node']}")
        print(f"内容: {token.content_blocks}")
        print()

输出效果类似:

节点: model
内容: [{'type': 'tool_call_chunk', 'name': 'get_weather', 'args': '', ...}]

节点: model
内容: [{'type': 'tool_call_chunk', 'name': None, 'args': '{"city', ...}]

节点: tools
内容: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]

节点: model
内容: [{'type': 'text', 'text': '杭州'}]

节点: model
内容: [{'type': 'text', 'text': '今天'}]

节点: model
内容: [{'type': 'text', 'text': '天气晴朗'}]

每个chunk是一个(token, metadata)元组,token包含内容,metadata包含节点名称等信息。

四、流式输出推理过程

有些模型支持"深度思考"模式,比如DeepSeek。使用stream_mode="messages"并过滤reasoning类型的内容块,可以流式输出模型的推理过程:

python
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        for block in token.content_blocks:
            if block["type"] == "reasoning":
                print(f"[思考] {block['text']}", end="", flush=True)
            elif block["type"] == "text":
                print(block["text"], end="", flush=True)

输出效果类似:

[思考] 用户问杭州天气,我需要调用get_weather工具...
杭州今天晴天,气温28°C~35°C,适合出门。

不同模型的推理格式不一样(Anthropic叫thinking块,OpenAI叫reasoning摘要),但LangChain统一把它们标准化成了reasoning类型,你不用关心底层差异。

五、自定义数据流式输出

有些工具执行时间很长,比如处理大文件、调用外部API等,你可能想实时输出处理进度。这时候可以用get_stream_writer()在工具内部输出自定义数据:

python
import time

from langchain.agents import create_agent
from langgraph.config import get_stream_writer


def generate_report(topic: str) -> str:
    """生成研究报告"""
    writer = get_stream_writer()

    writer(f"正在收集{topic}相关资料...")
    time.sleep(2)

    writer(f"正在分析数据...")
    time.sleep(2)

    writer(f"正在撰写报告...")
    time.sleep(2)

    writer(f"报告生成完成!")
    return f"《{topic}研究报告》已生成"


agent = create_agent(model="deepseek-v4-flash", tools=[generate_report])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "生成一份AI行业研究报告"}]},
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        print(chunk["data"])

输出效果:

正在收集AI行业相关资料...
正在分析数据...
正在撰写报告...
报告生成完成!

注意: 使用get_stream_writer()的工具只能在LangGraph执行环境中调用,单独调用会报错。

六、多模式组合使用

在实际项目中,你往往同时需要多种流模式。比如既想看Agent的进度更新,又想看自定义的进度信息。可以把stream_mode传一个列表:

python
from langchain.agents import create_agent
from langgraph.config import get_stream_writer


def get_weather(city: str) -> str:
    """查询城市天气"""
    writer = get_stream_writer()
    writer(f"正在查询{city}的天气数据...")
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode=["updates", "custom"],
    version="v2",
):
    print(f"模式: {chunk['type']}")
    print(f"数据: {chunk['data']}")
    print()

输出效果:

模式: custom
数据: 正在查询杭州的天气数据...

模式: updates
数据: {'model': {'messages': [AIMessage(...)]}}

模式: updates
数据: {'tools': {'messages': [ToolMessage(...)]}}

模式: updates
数据: {'model': {'messages': [AIMessage(...)]}}

七、流式输出工具调用

当LLM生成工具调用时,参数是逐步生成的(比如{"city": "杭州"}是分多个token生成的)。使用stream_mode="messages"可以拿到这些增量:

python
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode=["messages", "updates"],
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        if isinstance(token, AIMessageChunk):
            # 输出文本增量
            if token.text:
                print(token.text, end="|", flush=True)
            # 输出工具调用参数增量
            if token.tool_call_chunks:
                print(token.tool_call_chunks)
    elif chunk["type"] == "updates":
        for source, update in chunk["data"].items():
            if source == "tools":
                # 工具执行完成,输出结果
                print(f"工具结果: {update['messages'][-1].content_blocks}")

输出效果类似:

[{'name': 'get_weather', 'args': '', 'id': 'call_xxx', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '杭州', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具结果: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]
杭州|今天|天气晴朗|,|气温|28°C|~|35°C|,|适合|出门|。|

八、子Agent的流式输出

在多Agent协作的场景中,你需要区分token是哪个Agent输出的。创建Agent时给它起个name,然后在流式输出中通过lc_agent_name来判断:

python
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


# 创建天气Agent
weather_agent = create_agent(
    model="deepseek-v4-flash",
    tools=[get_weather],
    name="weather_agent",
)


def call_weather(query: str) -> str:
    """调用天气Agent"""
    result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
    return result["messages"][-1].text


# 创建主Agent
agent = create_agent(
    model="deepseek-v4-flash",
    tools=[call_weather],
    name="supervisor",
)

current_agent = None
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    stream_mode=["messages", "updates"],
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        # 通过lc_agent_name判断当前是哪个Agent
        if agent_name := metadata.get("lc_agent_name"):
            if agent_name != current_agent:
                print(f"\n🤖 {agent_name}: ")
                current_agent = agent_name
        if token.text:
            print(token.text, end="|", flush=True)
    elif chunk["type"] == "updates":
        for source, update in chunk["data"].items():
            if source == "tools":
                print(f"\n工具结果: {update['messages'][-1].content_blocks}")

输出效果类似:

🤖 supervisor:
[工具调用参数...]
🤖 weather_agent:
[工具调用参数...]
工具结果: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]
杭州|今天|天气晴朗|,|适合|出门|。|
🤖 supervisor:
杭州|天气|查询|结果|:|晴天|,|28°C|~|35°C|。|

九、禁用流式输出

有些场景下你不需要流式输出,比如后端批量处理任务。可以在创建模型时设置streaming=False

python
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-5.4", streaming=False)

如果模型不支持streaming参数,可以用disable_streaming=True代替,这个参数在所有模型上都可用。

十、总结

stream方法提供了三种流模式,各有各的用途:

  • updates:看Agent的执行步骤,适合调试和展示进度
  • messages:逐token输出,适合前端"打字机"效果
  • custom:自定义输出,适合长时间任务的进度通知

三种模式可以组合使用,通过chunk["type"]来区分数据来源。

如果你是新项目,官方更推荐使用stream_eventsAPI(上一篇文章介绍的),它的类型化投影更方便使用。但stream的灵活性更强,理解它能帮你更好地掌握LangChain的流式机制。